package com.flink_demo.demo.cep.source;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import com.flink_demo.demo.cep.event.MonitorEvent;
import com.flink_demo.demo.cep.event.SubMonitorEvent;

public class SubMonitorEventSource<T extends MonitorEvent> extends RichSourceFunction<T> {

	private static final long serialVersionUID = 1L;

	private Random random = new Random();
	private int intval = 100;
	private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

	
	//使用托管的键控state(Using Managed Keyed State)
	@Override
	public void open(Configuration parameters) throws Exception {
		ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
		ValueState<Tuple2<Long, Long>> sum = getRuntimeContext().getState(descriptor);
	}
	
	
	
	public SubMonitorEventSource(int intval) {
		this.intval = intval;
	}

	public SubMonitorEventSource() {
	}

	@Override
	public void cancel() {

	}

	@Override
	public void run(SourceContext<T> ctx) throws Exception {
		while (true) {
			SubMonitorEvent m = new SubMonitorEvent();
			m.setId(random.nextInt(10));
			m.setFdate(sdf.format(new Date()));
			m.setValue(random.nextInt(10));
			ctx.collect((T) m);
			Thread.sleep(this.intval);
		}

	}

}
